package com.kaigejava.studynote.edge.iotplatform.impl;

import com.kaigejava.studynote.edge.dto.ResultDTO;
import com.kaigejava.studynote.edge.iotplatform.IotPlatformClient;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @BelongsProject: kaigejavastudy
 * @BelongsPackage: com.kaigejava.studynote.edge.iotplatform.impl
 * @Author: kaigejava
 * @CreateTime: 2023-12-06  15:09
 * @Description: 物联网平台交换的实现类
 * @Version: 1.0
 */
@Slf4j
public class IotPlatformClientImpl implements IotPlatformClient {

    /**
     * 内存队列对象
     */
    private final Queue<String> gatewayDatasToReport = new ArrayBlockingQueue<>(60);
    /**
     * 线程池
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(24, new DefaultThreadFactory("abq-"));


    @Override
    public void reportToIot(ResultDTO resultDTO, Long time) {
        gatewayDataReport(time, resultDTO);
    }

    public void gatewayDataReport(Long timestamp, ResultDTO msg) {
        if (Objects.isNull(msg)) {
            System.out.println("消息为空,不处理");
            return;
        }

        synchronized (gatewayDatasToReport) {
            //将消息投放到队列中
            System.out.println("【队列生产者投递】当前时间是:"+timestamp);
            gatewayDatasToReport.offer("【" + timestamp + "】.获取到的消息内容为:" + msg);
            gatewayDatasToReport.notifyAll();
           // System.out.println("队列的长度是："+gatewayDatasToReport.size());
        }
    }

    /**
     * 消费者方法
     *
     * @param delay sleep时间
     */
    public void initGateReportService(final int delay) {
        scheduledExecutorService.submit(() -> {
            while (true) {
                try {
                    synchronized (gatewayDatasToReport) {
                        while (gatewayDatasToReport.isEmpty()) {
                            gatewayDatasToReport.wait(1);
                        }
                        sleep(delay);
                        reportGatewayPropertyUntilEmpty();
                        // 通知生产者
                        gatewayDatasToReport.notifyAll();
                    }
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    log.error("网关数据上报异常", e);
                }
            }
        });
        log.info("消费进程启动完成");
    }


    /**
     * 消费等待上数据，直到消费完
     */
    private void reportGatewayPropertyUntilEmpty() {

        while (!gatewayDatasToReport.isEmpty()) {
            //获取队列中的数据
            String msg = gatewayDatasToReport.remove();
            System.out.println("【队列消费者】从队列中获取到的数据是：" + msg);
            System.out.println("----上报云平台【完成】---");
        }
    }

    private void sleep(int second) {
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
